-
Notifications
You must be signed in to change notification settings - Fork 299
feat(datafusion): Implement insert_into
for IcebergTableProvider
#1600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
} else { | ||
self.table.clone() | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me wonder if storing table directly in the IcebergTableProvider
is correct... We could get a stale table if the provider doesn't have a catalog table.
Iceberg-java has a refresh()
interface which uses TableOperation
to refresh metadata. In iceberg-rs we don't have TableOperation
and need to rely on catalog to refresh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a PR to add that functionality (including the refresh): #1297
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @phillipleblanc , thanks for pointing me to your change! Your change makes sense to me, but I was thinking of adding something like this to impl Table
directly
pub async fn refresh(&mut self, catalog: &dyn Catalog) -> Result<Self>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this maybe a problem, but I don't think we should do it here? Creating another issue to track this maybe a better approach.
_insert_op: InsertOp, | ||
) -> DFResult<Arc<dyn ExecutionPlan>> { | ||
if !self | ||
.table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should refresh the table here and every otherself.table
usages in IcebergTableProvider
, but I think we should fix that in a separate PR if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @CTTY for this pr! Generally LGTM, just one minor point.
} else { | ||
self.table.clone() | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this maybe a problem, but I don't think we should do it here? Creating another issue to track this maybe a better approach.
_insert_op: InsertOp, | ||
) -> DFResult<Arc<dyn ExecutionPlan>> { | ||
if !self | ||
.table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable to me.
@@ -432,3 +432,370 @@ async fn test_metadata_table() -> Result<()> { | |||
|
|||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn test_insert_into() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for these tests, it would be much easier if we have sql logic tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will jump back on this after #1621 is completed
Which issue does this PR close?
insert_into
forIcebergTableProvider
#1540What changes are included in this PR?
catalog
toIcebergTableProvider
as optionalIcebergTableProvider::scan
insert_into
forIcebergTableProvider
using write node and commit node for non-partitioned tablesAre these changes tested?
Added tests